package cn.doitedu.etl;

import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.util.Collector;

import java.util.Map;

/**
 * @Author: deep as the sea
 * @Site: <a href="www.51doit.com">多易教育</a>
 * @QQ: 657270652
 * @Date: 2022/12/10
 * @Tips: 学大数据，到多易教育
 * @Desc: 流量分析-访问时长分析，轻度聚合模型表计算任务
 **/
public class E03_EtlJob_TrafficAccTimeLongAgg {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("file:/d:/checkpoint");
        env.setParallelism(1);

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        /**
         * 方便手动灌数据测试逻辑
         * 测试数据：
         * {"user_id":1,"release_channel":"360应用市场","device_type":"mi6","session_id":"s01","event_id":"app_launch","event_time":1670652001000,"properties":{}}
         * {"user_id":1,"release_channel":"360应用市场","device_type":"mi6","session_id":"s01","event_id":"page_load","event_time": 1670652002000,"page_type":"pt01","properties":{"url":"pg01"}}
         * {"user_id":1,"release_channel":"360应用市场","device_type":"mi6","session_id":"s01","event_id":"add_cart","event_time":  1670652003000,"page_type":"pt01","properties":{"url":"pg01"}}
         * {"user_id":1,"release_channel":"360应用市场","device_type":"mi6","session_id":"s01","event_id":"push_back","event_time": 1670652004000,"page_type":"pt01","properties":{"url":"pg01"}}
         * {"user_id":1,"release_channel":"360应用市场","device_type":"mi6","session_id":"s01","event_id":"wake_up","event_time":   1670652010000,"page_type":"pt01","properties":{"url":"pg01"}}
         * {"user_id":1,"release_channel":"360应用市场","device_type":"mi6","session_id":"s01","event_id":"ad_click","event_time":  1670652012000,"page_type":"pt01","properties":{"url":"pg01"}}
         * {"user_id":1,"release_channel":"360应用市场","device_type":"mi6","session_id":"s01","event_id":"page_load","event_time": 1670652014000,"page_type":"pt02","properties":{"url":"pg02"}}
         * {"user_id":1,"release_channel":"360应用市场","device_type":"mi6","session_id":"s01","event_id":"ad_show","event_time":   1670652016000,"page_type":"pt02","properties":{"url":"pg02"}}
         * {"user_id":1,"release_channel":"360应用市场","device_type":"mi6","session_id":"s01","event_id":"ad_show","event_time":   1670652020000,"page_type":"pt02","properties":{"url":"pg02"}}
         */

        KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("doitedu:9092")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setGroupId("gpac01"+System.currentTimeMillis())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setTopics("mall-evts-comdim-w")
                .build();
        DataStreamSource<String> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "s");
        SingleOutputStreamOperator<EventBean> beans = ds.map(s -> JSON.parseObject(s, EventBean.class));

        SingleOutputStreamOperator<EventBean> resDs = beans.keyBy(new KeySelector<EventBean, Tuple2<Integer, String>>() {
            @Override
            public Tuple2<Integer, String> getKey(EventBean bean) throws Exception {
                return Tuple2.of(bean.getUser_id(), bean.getSession_id());
            }
        }).process(new KeyedProcessFunction<Tuple2<Integer, String>, EventBean, EventBean>() {
            ValueState<EventBean> state;

            @Override
            public void open(Configuration parameters) throws Exception {
                // 这里的状态应考虑ttl自动清除（因为一个用户在一个会话结束后，此状态数据完全没用了）
                StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.minutes(30)).build();
                ValueStateDescriptor<EventBean> desc = new ValueStateDescriptor<>("last-bean", EventBean.class);
                desc.enableTimeToLive(ttlConfig);
                state = getRuntimeContext().getState(desc);
            }

            @Override
            public void processElement(EventBean eventBean, KeyedProcessFunction<Tuple2<Integer, String>, EventBean, EventBean>.Context ctx, Collector<EventBean> out) throws Exception {

                String curUrl = eventBean.getProperties().get("url");
                long curEventTime = eventBean.getEvent_time();

                // 如果是一个新的会话开始
                if (state.value() == null || !state.value().getSession_id().equals(eventBean.getSession_id())) {
                    eventBean.setPage_url(curUrl);
                    eventBean.setPage_ac_index(curUrl == null? 0 : 1);
                    eventBean.setSession_start(curEventTime);
                    eventBean.setSession_cut_id(eventBean.getSession_id() + "-" + curEventTime);
                    eventBean.setSession_cut_start(curEventTime);
                    eventBean.setPage_start(curEventTime);
                    eventBean.setPage_end(curEventTime);

                    out.collect(eventBean);

                    // 更新状态
                    state.update(eventBean);

                } else if (eventBean.getEvent_id().equals("page_load")) {
                    // 输出虚拟事件 : 把新页面加载事件时间作为上页面的结束时间
                    state.value().setPage_end(curEventTime);
                    out.collect(state.value()); // 这里就是输出了一个虚拟插值事件

                    // 输出新页面起始事件 ：设置新页面的各种起始时间及url
                    state.value().setPage_type(eventBean.page_type);
                    state.value().setPage_url(curUrl);
                    state.value().setPage_start(curEventTime);
                    state.value().setPage_ac_index(state.value().getPage_ac_index()+1);
                    out.collect(state.value());

                } else if (eventBean.getEvent_id().equals("wake_up")) {
                    // 唤醒事件，延续上一个状态的所有属性，唯独更新： 切割会话id，切割会话起始，页面起始时间； 及页面结束时间
                    state.value().setPage_end(curEventTime);
                    state.value().setSession_cut_id(eventBean.getSession_id() + "-" + curEventTime);
                    state.value().setSession_cut_start(curEventTime);
                    state.value().setPage_start(curEventTime);

                    out.collect(state.value());
                } else {
                    // 其他事件： 更新页面结束时间，并输出即可
                    state.value().setPage_end(curEventTime);

                    out.collect(state.value());
                }
            }
        });

        // 将datastream 注册成临时视图
        tEnv.createTemporaryView("res",resDs);

        // 创建doris连接器表：结果sink表
        tEnv.executeSql(
                " CREATE TABLE tfc_timelong_dorissink(         "
                        +"     dt  DATE                                  "
                        +"     ,user_id            INT                   "
                        +"     ,device_type        VARCHAR(20)           "
                        +"     ,release_channel    VARCHAR(20)           "
                        +"     ,session_id         VARCHAR(20)           "
                        +"     ,session_start      BIGINT                "
                        +"     ,session_cut_id     VARCHAR(20)           "
                        +"     ,session_cut_start  BIGINT                "
                        +"     ,page_start         BIGINT                "
                        +"     ,page_type          VARCHAR(20)           "
                        +"     ,page_url           VARCHAR(100)          "
                        +"     ,page_ac_index      INT                   "
                        +"     ,page_end           BIGINT                "
                        +" ) WITH (                                      "
                        +"    'connector' = 'doris',                     "
                        +"    'fenodes' = 'doitedu:8030',                "
                        +"    'table.identifier' = 'dws.mall_tfc_acct',  "
                        +"    'username' = 'root',                       "
                        +"    'password' = '',                           "
                        +"    'sink.label-prefix' = 'doris_tl"+System.currentTimeMillis()+"')"
        );

        // 将结果插入doris表
        // 如果按如下方式插入数据到doris中，会给doris带来巨大的高频数据导入压力
        // 可以考虑在flink中按时间窗口做轻度聚合,聚合逻辑与doris目标表模型的逻辑完全一样
        tEnv.executeSql(
                "INSERT INTO   tfc_timelong_dorissink "
                        +"SELECT                                   "
                        +"    to_date(date_format(to_timestamp_ltz(session_start,3),'yyyy-MM-dd')) as dt "
                        +"    ,user_id                             "
                        +"    ,device_type                         "
                        +"    ,release_channel                     "
                        +"    ,session_id                          "
                        +"    ,session_start                       "
                        +"    ,session_cut_id                      "
                        +"    ,session_cut_start                   "
                        +"    ,page_start                          "
                        +"    ,page_type                           "
                        +"    ,page_url                            "
                        +"    ,page_ac_index                       "
                        +"    ,page_end                            "
                        +"from res                                 "
        );










        env.execute();
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class EventBean {
        int user_id;
        String device_type;
        String release_channel;
        String session_id;
        String page_type;
        String event_id;
        long event_time;
        Map<String, String> properties;
        long session_start;
        String session_cut_id;
        long session_cut_start;
        long page_start;
        String page_url;
        int page_ac_index; // 一个会话中的访问页面序号
        long page_end;
    }

}







